Merge remote-tracking branch 'origin/master' into prevent_extra_threads_in_specs

Andrew Cantino 8 年之前
父節點
當前提交
9bdbdfa10d

+ 21 - 7
app/concerns/long_runnable.rb

@@ -51,12 +51,13 @@ module LongRunnable
51 51
   end
52 52
 
53 53
   class Worker
54
-    attr_reader :thread, :id, :agent, :config, :mutex, :scheduler
54
+    attr_reader :thread, :id, :agent, :config, :mutex, :scheduler, :restarting
55 55
 
56 56
     def initialize(options = {})
57 57
       @id = options[:id]
58 58
       @agent = options[:agent]
59 59
       @config = options[:config]
60
+      @restarting = false
60 61
     end
61 62
 
62 63
     def run
@@ -65,6 +66,7 @@ module LongRunnable
65 66
 
66 67
     def run!
67 68
       @thread = Thread.new do
69
+        Thread.current[:name] = "#{id}-#{Time.now}"
68 70
         begin
69 71
           run
70 72
         rescue SignalException, SystemExit
@@ -90,18 +92,23 @@ module LongRunnable
90 92
       if respond_to?(:stop)
91 93
         stop
92 94
       else
93
-        terminate_thread
95
+        terminate_thread!
94 96
       end
95 97
     end
96 98
 
97
-    def terminate_thread
98
-      thread.terminate if thread
99
+    def terminate_thread!
100
+      if thread
101
+        thread.terminate
102
+        thread.wakeup if thread.status == 'sleep'
103
+      end
99 104
     end
100 105
 
101 106
     def restart!
102
-      stop!
103
-      setup!(scheduler, mutex)
104
-      run!
107
+      without_alive_check do
108
+        stop!
109
+        setup!(scheduler, mutex)
110
+        run!
111
+      end
105 112
     end
106 113
 
107 114
     def every(*args, &blk)
@@ -125,5 +132,12 @@ module LongRunnable
125 132
     def schedule(method, args, &blk)
126 133
       @scheduler.send(method, *args, tag: id, &blk)
127 134
     end
135
+
136
+    def without_alive_check(&blk)
137
+      @restarting = true
138
+      yield
139
+    ensure
140
+      @restarting = false
141
+    end
128 142
   end
129 143
 end

+ 1 - 0
app/models/agents/event_formatting_agent.rb

@@ -1,6 +1,7 @@
1 1
 module Agents
2 2
   class EventFormattingAgent < Agent
3 3
     cannot_be_scheduled!
4
+    can_dry_run!
4 5
 
5 6
     description <<-MD
6 7
       The Event Formatting Agent allows you to format incoming Events, adding new fields as needed.

+ 7 - 9
app/models/agents/twitter_stream_agent.rb

@@ -159,7 +159,7 @@ module Agents
159 159
         @filter_to_agent_map = @config[:filter_to_agent_map]
160 160
 
161 161
         schedule_in RELOAD_TIMEOUT do
162
-          puts "--> Restarting TwitterStream #{id}"
162
+          puts "--> Restarting TwitterStream #{id} at #{Time.now} <--"
163 163
           restart!
164 164
         end
165 165
       end
@@ -176,7 +176,7 @@ module Agents
176 176
 
177 177
       def stop
178 178
         EventMachine.stop_event_loop if EventMachine.reactor_running?
179
-        thread.terminate
179
+        terminate_thread!
180 180
       end
181 181
 
182 182
       private
@@ -199,16 +199,16 @@ module Agents
199 199
         end
200 200
 
201 201
         stream.on_error do |message|
202
-          STDERR.puts " --> Twitter error: #{message} <--"
202
+          STDERR.puts " --> Twitter error: #{message} at #{Time.now} <--"
203 203
         end
204 204
 
205 205
         stream.on_no_data do |message|
206
-          STDERR.puts " --> Got no data for awhile; trying to reconnect."
206
+          STDERR.puts " --> Got no data for awhile; trying to reconnect at #{Time.now} <--"
207 207
           restart!
208 208
         end
209 209
 
210 210
         stream.on_max_reconnects do |timeout, retries|
211
-          STDERR.puts " --> Oops, tried too many times! <--"
211
+          STDERR.puts " --> Oops, tried too many times! at #{Time.now} <--"
212 212
           sleep 60
213 213
           restart!
214 214
         end
@@ -222,20 +222,18 @@ module Agents
222 222
         status['text'] = status['text'].gsub(/&lt;/, "<").gsub(/&gt;/, ">").gsub(/[\t\n\r]/, '  ')
223 223
 
224 224
         if status["retweeted_status"].present? && status["retweeted_status"].is_a?(Hash)
225
-          puts "Skipping retweet: #{status["text"]}"
226 225
           return
227 226
         elsif @recent_tweets.include?(status["id_str"])
228
-          puts "Skipping duplicate tweet: #{status["text"]}"
227
+          puts "(#{Time.now}) Skipping duplicate tweet: #{status["text"]}"
229 228
           return
230 229
         end
231 230
 
232 231
         @recent_tweets << status["id_str"]
233 232
         @recent_tweets.shift if @recent_tweets.length > DUPLICATE_DETECTION_LENGTH
234
-        puts status["text"]
235 233
         @filter_to_agent_map.keys.each do |filter|
236 234
           next unless (filter.downcase.split(SEPARATOR) - status["text"].downcase.split(SEPARATOR)).reject(&:empty?) == [] # Hacky McHackerson
237 235
           @filter_to_agent_map[filter].each do |agent|
238
-            puts " -> #{agent.name}"
236
+            puts "(#{Time.now}) #{agent.name} received: #{status["text"]}"
239 237
             AgentRunner.with_connection do
240 238
               agent.process_tweet(filter, status)
241 239
             end

+ 4 - 3
app/models/agents/website_agent.rb

@@ -264,8 +264,9 @@ module Agents
264 264
         error "Ignoring a non-HTTP url: #{url.inspect}"
265 265
         return
266 266
       end
267
-      log "Fetching #{url}"
268
-      response = faraday.get(url)
267
+      uri = Utils.normalize_uri(url)
268
+      log "Fetching #{uri}"
269
+      response = faraday.get(uri)
269 270
       raise "Failed: #{response.inspect}" unless response.success?
270 271
 
271 272
       interpolation_context.stack {
@@ -303,7 +304,7 @@ module Agents
303 304
           interpolated['extract'].keys.each do |name|
304 305
             result[name] = output[name][index]
305 306
             if name.to_s == 'url'
306
-              result[name] = (response.env[:url] + result[name]).to_s
307
+              result[name] = (response.env[:url] + Utils.normalize_uri(result[name])).to_s
307 308
             end
308 309
           end
309 310
 

+ 1 - 1
lib/agent_runner.rb

@@ -101,7 +101,7 @@ class AgentRunner
101 101
 
102 102
   def restart_dead_workers
103 103
     @workers.each_pair do |id, worker|
104
-      if worker.thread && !worker.thread.alive?
104
+      if !worker.restarting && worker.thread && !worker.thread.alive?
105 105
         puts "Restarting #{id.to_s}" unless Rails.env.test?
106 106
         @workers[id].run!
107 107
       end

+ 12 - 0
lib/utils.rb

@@ -21,6 +21,18 @@ module Utils
21 21
     end
22 22
   end
23 23
 
24
+  def self.normalize_uri(uri)
25
+    begin
26
+      URI(uri)
27
+    rescue URI::Error
28
+      URI(uri.to_s.gsub(/[^\-_.!~*'()a-zA-Z\d;\/?:@&=+$,\[\]]+/) { |unsafe|
29
+            unsafe.bytes.each_with_object(String.new) { |uc, s|
30
+              s << sprintf('%%%02X', uc)
31
+            }
32
+          }.force_encoding(Encoding::US_ASCII))
33
+    end
34
+  end
35
+
24 36
   def self.interpolate_jsonpaths(value, data, options = {})
25 37
     if options[:leading_dollarsign_is_jsonpath] && value[0] == '$'
26 38
       Utils.values_at(data, value).first.to_s

+ 25 - 3
spec/concerns/long_runnable_spec.rb

@@ -46,7 +46,7 @@ describe LongRunnable do
46 46
     end
47 47
 
48 48
     after(:each) do
49
-      @worker.thread.terminate if @worker.thread
49
+      @worker.thread.terminate if @worker.thread && !@skip_thread_terminate
50 50
       @scheduler.shutdown(:wait)
51 51
     end
52 52
 
@@ -81,7 +81,7 @@ describe LongRunnable do
81 81
 
82 82
     context "#stop!" do
83 83
       it "terminates the thread" do
84
-        mock.proxy(@worker).terminate_thread
84
+        mock.proxy(@worker).terminate_thread!
85 85
         @worker.stop!
86 86
       end
87 87
 
@@ -91,6 +91,28 @@ describe LongRunnable do
91 91
       end
92 92
     end
93 93
 
94
+    context "#terminate_thread!" do
95
+      before do
96
+        @skip_thread_terminate = true
97
+        mock_thread = Object.new
98
+        stub(@worker).thread { mock_thread }
99
+      end
100
+
101
+      it "terminates the thread" do
102
+        mock(@worker.thread).terminate
103
+        do_not_allow(@worker.thread).wakeup
104
+        mock(@worker.thread).status { 'run' }
105
+        @worker.terminate_thread!
106
+      end
107
+
108
+      it "wakes up sleeping threads after termination" do
109
+        mock(@worker.thread).terminate
110
+        mock(@worker.thread).wakeup
111
+        mock(@worker.thread).status { 'sleep' }
112
+        @worker.terminate_thread!
113
+      end
114
+    end
115
+
94 116
     context "#restart!" do
95 117
       it "stops, setups and starts the worker" do
96 118
         mock(@worker).stop!
@@ -117,4 +139,4 @@ describe LongRunnable do
117 139
       end
118 140
     end
119 141
   end
120
-end
142
+end

+ 17 - 0
spec/data_fixtures/urlTest.html

@@ -0,0 +1,17 @@
1
+<html>
2
+    <head>
3
+        <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
4
+        <title>test</title>
5
+    </head>
6
+    <body>
7
+        <ul>
8
+            <li><a href="http://google.com">google</a></li>
9
+            <li><a href="https://www.google.ca/search?q=some query">broken</a></li>
10
+            <li><a href="https://www.google.ca/search?q=some%20query">escaped</a></li>
11
+            <li><a href="http://ko.wikipedia.org/wiki/위키백과:대문">unicode url</a></li>
12
+            <li><a href="https://www.google.ca/search?q=위키백과:대문">unicode param</a></li>
13
+            <li><a href="http://ko.wikipedia.org/wiki/%EC%9C%84%ED%82%A4%EB%B0%B1%EA%B3%BC:%EB%8C%80%EB%AC%B8">percent encoded url</a></li>
14
+            <li><a href="https://www.google.ca/search?q=%EC%9C%84%ED%82%A4%EB%B0%B1%EA%B3%BC:%EB%8C%80%EB%AC%B8">percent encoded param</a></li>
15
+        </ul>
16
+    </body>
17
+</html>

+ 11 - 12
spec/models/agents/twitter_stream_agent_spec.rb

@@ -192,7 +192,7 @@ describe Agents::TwitterStreamAgent do
192 192
 
193 193
     context "#stop" do
194 194
       it "stops the thread" do
195
-        mock(@worker.thread).terminate
195
+        mock(@worker).terminate_thread!
196 196
         @worker.stop
197 197
       end
198 198
     end
@@ -221,20 +221,20 @@ describe Agents::TwitterStreamAgent do
221 221
       context "callback handling" do
222 222
         it "logs error messages" do
223 223
           stub_without(:on_error).on_error.yields('woups')
224
-          mock(STDERR).puts(" --> Twitter error: woups <--")
224
+          mock(STDERR).puts(anything) { |text| expect(text).to match(/woups/) }
225 225
           @worker.send(:stream!, ['agent'], @agent)
226 226
         end
227 227
 
228 228
         it "stop when no data was received"do
229 229
           stub_without(:on_no_data).on_no_data.yields
230 230
           mock(@worker).restart!
231
-          mock(STDERR).puts(" --> Got no data for awhile; trying to reconnect.")
231
+          mock(STDERR).puts(anything)
232 232
           @worker.send(:stream!, ['agent'], @agent)
233 233
         end
234 234
 
235 235
         it "sleeps for 60 seconds on_max_reconnects" do
236 236
           stub_without(:on_max_reconnects).on_max_reconnects.yields
237
-          mock(STDERR).puts(" --> Oops, tried too many times! <--")
237
+          mock(STDERR).puts(anything)
238 238
           mock(@worker).sleep(60)
239 239
           mock(@worker).restart!
240 240
           @worker.send(:stream!, ['agent'], @agent)
@@ -251,22 +251,21 @@ describe Agents::TwitterStreamAgent do
251 251
 
252 252
     context "#handle_status" do
253 253
       it "skips retweets" do
254
-        mock.instance_of(IO).puts('Skipping retweet: retweet')
255
-        @worker.send(:handle_status, {'text' => 'retweet', 'retweeted_status' => {one: true}})
254
+        @worker.send(:handle_status, {'text' => 'retweet', 'retweeted_status' => {one: true}, 'id_str' => '123' })
255
+        expect(@worker.instance_variable_get(:'@recent_tweets')).not_to include('123')
256 256
       end
257 257
 
258 258
       it "deduplicates tweets" do
259
-        mock.instance_of(IO).puts("dup")
260
-        @worker.send(:handle_status, {'text' => 'dup', 'id_str' => 1})
261
-        mock.instance_of(IO).puts("Skipping duplicate tweet: dup")
262
-        @worker.send(:handle_status, {'text' => 'dup', 'id_str' => 1})
259
+        @worker.send(:handle_status, {'text' => 'dup', 'id_str' => '1'})
260
+        @worker.send(:handle_status, {'text' => 'dup', 'id_str' => '1'})
261
+        expect(@worker.instance_variable_get(:'@recent_tweets').select { |str| str == '1' }.length).to eq 1
263 262
       end
264 263
 
265 264
       it "calls the agent to process the tweet" do
266
-        stub.instance_of(IO).puts
267 265
         mock(@mock_agent).name { 'mock' }
268 266
         mock(@mock_agent).process_tweet('agent', {'text' => 'agent'})
269
-        @worker.send(:handle_status, {'text' => 'agent'})
267
+        @worker.send(:handle_status, {'text' => 'agent', 'id_str' => '123'})
268
+        expect(@worker.instance_variable_get(:'@recent_tweets')).to include('123')
270 269
       end
271 270
     end
272 271
   end

+ 63 - 0
spec/models/agents/website_agent_spec.rb

@@ -911,4 +911,67 @@ fire: hot
911 911
       end
912 912
     end
913 913
   end
914
+
915
+  describe "checking urls" do
916
+    before do
917
+      stub_request(:any, /example/).
918
+        to_return(:body => File.read(Rails.root.join("spec/data_fixtures/urlTest.html")), :status => 200)
919
+      @valid_options = {
920
+        'name' => "Url Test",
921
+        'expected_update_period_in_days' => "2",
922
+        'type' => "html",
923
+        'url' => "http://www.example.com",
924
+        'mode' => 'all',
925
+        'extract' => {
926
+          'url' => { 'css' => "a", 'value' => "@href" },
927
+        }
928
+      }
929
+      @checker = Agents::WebsiteAgent.new(:name => "ua", :options => @valid_options)
930
+      @checker.user = users(:bob)
931
+      @checker.save!
932
+    end
933
+
934
+    describe "#check" do
935
+      before do
936
+        expect { @checker.check }.to change { Event.count }.by(7)
937
+        @events = Event.last(7)
938
+      end
939
+
940
+      it "should check hostname" do
941
+        event = @events[0]
942
+        expect(event.payload['url']).to eq("http://google.com")
943
+      end
944
+
945
+      it "should check unescaped query" do
946
+        event = @events[1]
947
+        expect(event.payload['url']).to eq("https://www.google.ca/search?q=some%20query")
948
+      end
949
+
950
+      it "should check properly escaped query" do
951
+        event = @events[2]
952
+        expect(event.payload['url']).to eq("https://www.google.ca/search?q=some%20query")
953
+      end
954
+
955
+      it "should check unescaped unicode url" do
956
+        event = @events[3]
957
+        expect(event.payload['url']).to eq("http://ko.wikipedia.org/wiki/%EC%9C%84%ED%82%A4%EB%B0%B1%EA%B3%BC:%EB%8C%80%EB%AC%B8")
958
+      end
959
+
960
+      it "should check unescaped unicode query" do
961
+        event = @events[4]
962
+        expect(event.payload['url']).to eq("https://www.google.ca/search?q=%EC%9C%84%ED%82%A4%EB%B0%B1%EA%B3%BC:%EB%8C%80%EB%AC%B8")
963
+      end
964
+
965
+      it "should check properly escaped unicode url" do
966
+        event = @events[5]
967
+        expect(event.payload['url']).to eq("http://ko.wikipedia.org/wiki/%EC%9C%84%ED%82%A4%EB%B0%B1%EA%B3%BC:%EB%8C%80%EB%AC%B8")
968
+      end
969
+
970
+      it "should check properly escaped unicode query" do
971
+        event = @events[6]
972
+        expect(event.payload['url']).to eq("https://www.google.ca/search?q=%EC%9C%84%ED%82%A4%EB%B0%B1%EA%B3%BC:%EB%8C%80%EB%AC%B8")
973
+      end
974
+
975
+    end
976
+  end
914 977
 end